Python高级


一、内存管理

1 内部结构

1.1 环状双向链表

环状双向链表(refchain)。在python程序中创建的任何对象都会放在refchain链表中。

static PyObject refchain = {&refchain, &refchain, ...}

假设我name = "Bob",内部的结构体会存上:上一对象、下一对象、类型、引用个数等内容。如果此时我再new = name,那么引用个数则会增加1个。

对于所有对象来说,内部结构体都会存上上一对象、下一对象、类型、引用个数,而不同的数据类型还会存储一切额外内容。比如创建int还会将int值存入结构体,创建list还会将列表,列表长度存入结构体。

综上:每个对象都有PyObject结构体,由多个元素组成的对象则是:PyObject + ob_size

// C语言源码
#define Pyobject_HEAD Pyobject ob_base;
#define Pyobject_VAR_HEAD PyVarobject ob_base;

// 宏定义,包含 上一个、下一个,用于构造双向链表(放到refchain链表中时要用到)
#define _Pyobject_HEAD_EXTRA \
struct _object *_ob_next; \
struct _object *_ob_prev;

typedef struct _object {
_PyObject_HEAD_EXTRA // 用于构造双向链表
Py_ssize_t ob_refcnt; // 引用计数器
struct _typeobject *ob_type; // 数据类型
} PyObject;

typedef struct {
PyObject ob_base; // Pyobject对象
Py_ssize_t ob_size; // Number of items in variable part,即:元素个数
} PyVarObject;

1.2 具体类型结构体

假设设置了data=3.14,那么内部会创建:

  • _ob_prev = refchain 中上一个对象
  • _ob_next = refchain 中下一个对象
  • ob_refcnt = 1
  • ob_type = float
  • ob_fval = 3.14
// float
typedef struct {
PyObject_HEAD
double ob_fval;
} PyFloatObject;


// int
struct _longobject {
PyObject_HEAD
digit ob_digit[1];
}
/* Long (arbitrary precision) integer object inerface */
typedef struct _longobject PyLongObject; /* Revealed in longintrepr.h */


// list
typedef struct {
PyObject_VAR_HEAD
PyObject **ob_item;
Py_ssize_t allocated;
} PyListObject;


// tuple
typedef struct {
PyObject_VAR_HEAD
PyObject **ob_item[1];
} PyTupleObject;


// dict
typedef struct {
PyObject_HEAD
Py_ssize_t ma_used;
PyDictKeysObject *ma_keys;
PyObject **ma_values;
} PyDictObject;

1.3 引用计数器

当python程序运行时,会根据数据类型的不同找到其对应的结构体,根据结构体中的字段来进行创建相关的数据。然后将对象添加到refchain双向链表中。

在C源码中有两个关键的结构体:PyObjectPyVarObject

每个对象中的ob_refcnt就是引用计数器,值默认为1,当有其他变量引用对象时,引用计数器就会发生变化。

当一个对象的引用计数器为0时,意味着没有人再使用这个对象了,这个对象就是垃圾,此时进行垃圾回收。

垃圾回收简单理解步骤(实际不止,可以看后续):

  1. 对象从refchain链表移除。
  2. 将对象销毁,内存归还。
""" 注意,下列a、b对应的对象实际上是同一个 """
# a变量赋值:a对应对象引用计数器 + 1
a = 99999

# b = a:b对应对象引用计数器 + 1
b = a

# b变量删除:b对应对象引用计数器 - 1
del b

# a变量删除:a对应对象引用计数器 - 1
del a

1.4 循环引用问题

只使用引用计数器来管理内存看似完美,实际上会存在循环引用问题。

v1 = [11, 22, 33]  # 创建列表对象v1并赋值,对象引用计数器为1
v2 = [44, 55, 66] # 创建列表对象v2并赋值,对象引用计数器为1

# 把v2追加到v1中,v2对应的[44, 55, 66]对象的引用计数器加1,变为2
v1.append(v2)
# 把v1追加到v2中,v1对应的[11, 22, 33]对象的引用计数器加1,变为2
v2.append(v1)


# 删除变量v1,v1对象引用计数器减1
del v1
# 删除变量v2,v2对象引用计数器减1
del v2

""" 此时v1,v2对象引用计数器还有1,那么这两个列表此时就会常驻在内存中 """

2 标记清除

目的:为解决引用计数器循环引用的不足。

实现:在python的底层再维护一个链表,链表中专门放那些可能存在循环引用的对象(list / tuple / dict / set)

在Python内部某种情况下触发,回去扫描可能存在循环应用的链表中的每个元素,检查是否有循环引用,如果有则让双方的引用计数器 -1。如果是0则垃圾回收。

3 分代回收

标记清除问题:

  • 什么时候扫描?
  • 可能存在循环引用的链表扫描代价大,每次扫描耗时久。

于是python引入了分代回收机制,在python内存管理系统将内存分为不同的世代,新创建的对象首先放在第0代。经过多次垃圾回收周期,如果对象依然存活,则会被提升至老一代,以此类推。

标记清除维护的列表就是指的是分代回收的三个列表,它包含了可能存在循环引用的对象,通过定期扫描这个列表来处理可能的循环引用问题。

将可能存在循环应用的对象维护成3个链表:

  • 0代:0代中对象个数达到700个则扫描一次。
  • 1代:0代扫描10次,则1代扫描一次。
  • 2代:1代扫描10次,则2代扫描一次。

4 缓存机制

4.1 总结

在python中维护了一个refchain的双向环状链表,这个链表中存储程序创建的所有对象,每种类型的对象中都有一个ob_refcnt引用计数器的值,引用个数+1、-1,最后当引用计数器变为0时会进行垃圾回收(对象销毁、refchain中移除)。

但是,在python中对于那些可以有多个元素组成的对象可能会存在循环引用的问题,为了解决这个问题python又引入了标记清除和分代回收,在其内部为了4个链表。

在源码内部当达到各自的阈值时,就会触发扫描链表进行标记清除的动作(有循环则各自-1)。

4.2 池

作用于int / str类型。

为了避免重复创建和销毁一些常见对象,于是会维护一个池。

在启动解释器时,对于int类型,python内部会帮我们创建:[-5, 256]。对于str类型,会维护一个unicode_latin1[256]的链表,内部存所有的ascii字符,之后使用就不会再重复创建。

字符串驻留机制:Python会对长度为0到20个字符的由英文字符,数字,下划线构成的字符串进行驻留,下次再创建时,不会新开辟内存,通过id()可以发现值相等。

v1 = 7  # 内存不会开辟内存,直接从池中取的
v2 = 9 # 内存不会开辟内存,直接从池中取的
v3 = 9 # 内存不会开辟内存,直接从池中取的

# 地址一样,都是池里面拿的
print(id(v2), id(v3))

4.3 free_list

作用于:float / list / tuple / dict

当一个对象的引用计数器为0时,按理说应该回收,但是内部实际上不会直接回收,而是将对象添加到free_list链表中当缓存。以后再去创建对象时,不会重新开辟内存,而是直接使用free_list

free_list是由上限的,当free_list没满时引用计数器为0的对象才会加到free_list中,满了的话则会直接销毁对象。

# 开辟内存,内部存储结构中初始化值后存在refchain中
v1 = 3.14

# 从refchain中移除,将对象添加到free_list中
del v1

# 不会重新开辟内存,去free_list中获取对象,对象内部数据初始化后,再放到refchain中
v9 = 999.99

5 源码剖析

5.1 float 类型

创建

val = 3.14

int类型和float很相似,只是int类型还会先去小数据池里找,没有才会创建。

// Objects/floatobject.c

// 用于缓存f1oat对象的链表
static PyFloatObject *free_list = NULL;
static int numfree = 0;

PyObject *PyFloat_FromDouble(double fval) {
// 如果free_list中有可用对象,则从free_list链表拿出来一个;否则为对象重新分配内存。
PyFloatobject *op = free_list;
if (op != NULL) {
free_list = (PyFloatobject *) Py_TYPE(op);
numfree--;
} else {
// 根据float类型的大小,为f1oat对象重新分配内存。
op = (PyFloatobject *) PyObject_MALLOC(sizeof(PyFloatobject));
if (!op)
return PyErr_NoMemory();
}
// 对float对象进行初始化,例如:引用计数器初始化为1、添加到refchain链表等。
/* Inline PyObject_New */
(void)PyObject_INIT(op, &PyFloat_Type);

// 对float对象赋值。即:op->ob_fval = fval;
op->ob_fval = fval;

return (PyObject *)op;
}
// Include/objimpl.h

/*
(Py_TYPE(op) = (typeobj)): 设置op对象的类型为typeobj。
_Py_NewReference((PyObject *)(op)): 为op对象增加一个引用计数。
(op): 返回初始化后的PyObject对象。
*/

#define PyObject_INIT(op, typeobj) (Py_TYPE(op) = (typeobj), _Py_NewReference((PyObject *)(op)), (op))
// Obejct/object.c

// 维护了所有对象的一个环状双向链表
static PyObejct refchain = {&refchain, &refchain};

void _Py_AddToAllObjects(PyObject *op, int force)
{
if(force || op->ob_prev == NULL){
op->_ob_next = refchain._ob_next;
op->_ob_prev = &refchain;
refchain._ob_next->_ob_prev = op;
refchain._ob_next = op;
}
}

void _Py_NewReference(PyObject *op)
{
_Py_INC_REFTOTAL;

// 引用计数器初始化为1
op->ob_refcnt = 1;

// 对象添加到双向链表refchain中
_Py_AddToAllObjects(op, 1);

_Py_INC_TPALLOCS(op);
}

引用

val = 3.14

data = val

项目中这样的引用关系会使原对象的引用计数器+1。

// Inlcude/object.h

static inline void _Py_INCREF(PyObject *op)
{
_Py_INC_REFTOTAL;
// 对象的引用计数器 + 1
op->ob_refcnt++;
}

#define Py_INCREF(op) _Py_INCREF(_PyObject_CAST(op))

销毁

val = 3.14

del 3.14

// Include/object.h
static inline void _Py_DECREF(const char *filename, int lineno, PyObject *op)
{
(void)filename; /* may be unused, shut up -Wunused-parameter */
(void)lineno; /* may be unused, shut up -Wunused-parameter */
_Py_DEC_REFTOTAL;

// 引用计数器-1,如果引用计数器为0,则执行 _Py_Dealloc 去缓存或垃圾回收
if (--op->ob_refcnt != 0) {
#ifdef Py_REF_DEBUG
if (op->ob_refcnt < 0) {
_Py_NegativeRefcount(filename, lineno, op);
}
#endif
} else {
_Py_Dealloc(op);
}
}

#define Py_DECREF(op) _Py_DECREF(__FILE__, __LINE__, _PyObject_CAST(op))
// Objects/object.c
void _Py_Dealloc(PyObject *op)
{
// 找到float类型的 tp_dealloc 函数
destructor dealloc = Py_TYPE(op)->tp_dealloc;

// 在 refchain 双向链表中摘除此对象
_Py_ForgetReference(op);

// 执行 float 类型的 tp_dealloc 函数,去进行缓存或垃圾回收
(*dealloc)(op);
}

void _Py_ForgetReference(PyObject *op)
{
// 在 refchain 链表中移除此对象
op->_ob_next->_ob_prev = op->_ob_prev;
op->_ob_prev->_ob_next = op->_ob_next;
op->_ob_next = op->_ob_prev = NULL;

_Py_INC_TPFREES(op);
}
// objects/floatobject.c

#define PyFloat_MAXFREELIST 100
static int numfree = 0;
static PyFloatObject *free_list = NULL;

// float 类型中函数的对应关系
PyTypeObject PyFloat_Type = {
PyVarObject_HEAD_INIT(&PyType_Type, 0)
"float",
sizeof(PyFloatObject),
0,
// tp_dealloc 表示执行 float_dealloc 方法(destructor: float_dealloc)
(destructor) float_dealloc, /* tp_dealloc */
0, /* tp_print */
};

static void float_dealloc(PyFloatObject *op)
{
// 检测是否是 float 类型
if (PyFloat_CheckExact(op))
{
// 检测 free_list 中缓存的个数是否已满,如果已满,则直接将对象销毁
if (numfree >= PyFloat_MAXFREELIST)
{
PyObject_FREE(op);
return;
}

// 将对象加入到 free_list 链表中
numfree++;
Py_TYPE(op) = (struct _typeobject *) free_list;
free_list = op;
}
else
{
// 如果不是完全匹配,则通过 tp_free 销毁对象
Py_TYPE(op)->tp_free((PyObject *) op);
}
}

5.2 list 类型

创建

v =[11, 22, 33]


二、多线程

1 进程和线程

1.1 定义

线程:是计算机中可以被cpu调度的最小单元(真正在工作)。

进程:是计算机资源分配的最小单元(进程为线程提供资源)。

一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源。

通过进程和线程都可以将串行的程序变成并发。

Python的多线程主要是在用户态中创建线程,在内核态中也会创建线程,并且多个用户线程可以映射到多个内核线程上,形成多对多的映射关系。

1.2 多线程

在同一个进程中开启多个线程执行任务。

子线程开始后,主线程会一直向下走,不会停止。

import time
import requests
import threading

url_list = {
("1.mp4", "https://www.xxx.com/?id=1"),
("2.mp4", "https://www.xxx.com/?id=2"),
("3.mp4", "https://www.xxx.com/?id=3"),
}

def task(file_name, video_url):
res = requests.get(video_url)
with open(file_name, mode='wb') as f:
f.write(res.content)
print(time.time())

for name, url in url_list:
# 创建线程,target是函数,args是参数
t = threading.Thread(target=task, args=(name, url))
# 启动线程
t.start()

1.3 多进程

开启不同的进程执行任务(不同的进程中会自动创建一个线程)。

注:Linux系统使用fork创建进程,Windows使用spawn,Mac支持fork和spawn。在python3.8版本后默认使用spawn创建线程,所以程序会要求创建线程代码必须在if __name__ == '__main__':下,否则会异常。

mac中可以修改创建线程使用的方式来避免改变创建方式。

import time
import requests
import multiprocessing
# mac可以修改创建方式来改变创建方式(写在程序开头位置)
# multiprocessing.set_start_method('fork')

url_list = {
("1.mp4", "https://www.xxx.com/?id=1"),
("2.mp4", "https://www.xxx.com/?id=2"),
("3.mp4", "https://www.xxx.com/?id=3"),
}

def task(file_name, video_url):
res = requests.get(video_url)
with open(file_name, mode='wb') as f:
f.write(res.content)
print(time.time())

# windows下必须放在main中(也可以写成一个函数,只要是在__main__下启动即可)
if __name__ == '__main__':
for name, url in url_list:
t = multiprocessing.Process(target=task, args=(name, url))
t.start()

1.4 GIL锁

GIL,全局解释器锁(Global Interpreter Lock),是CPython解释器特有的东西,让一个进程中同一时刻只有一个线程可以被CPU调度。

如果程序想利用计算机的多核优势,让CPU同时处理一些任务,适合用多进程开发(即使资源开销大)

如果程序不需要计算机的多核优势,适合用多线程开发,比如网络下载文件,多用网卡而非CPU。

常见的程序开发中,计算操作需要使用CPU多核优势,IO操作不需要利用CPU的多核优势:

  • 计算密集型,用多进程,例如∶大量的数据计算【累加计算示例】。
  • IO密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】。
# 计算密集型(计算0 - 100000000)
import time
import multiprocessing

def task(start, end, queue):
res = 0
for i in range(start, end):
res += 1
queue.put(res)

if __name__ == '__main__':
queue = multiprocessing.Queue()
start_time = time.time()

p1 = multiprocessing.Process(target = task, args=(0, 50000000, queue))
p1.start()

p2 = multiprocessing.Process(target = task, args=(50000000, 100000000, queue))
p2.start()

v1 = queue.get(block=True)
v2 = queue.get(block=True)

print(v1 + v2)

end_time = time.time()
print(end_time - start_time)


# IO密集型(下载文件资源)
import time
import requests
import multiprocessing

url_list = {
("1.mp4", "https://www.xxx.com/?id=1"),
("2.mp4", "https://www.xxx.com/?id=2"),
("3.mp4", "https://www.xxx.com/?id=3"),
}

def task(file_name, video_url):
res = requests.get(video_url)
with open(file_name, mode='wb') as f:
f.write(res.content)
print(time.time())

if __name__ == '__main__':
for name, url in url_list:
t = multiprocessing.Process(target=task, args=(name, url))
t.start()


# 也可以使用综合方式
import multiprocessing
import threading

def thread_task():
print("Thread task executed")

def task(start, end):
t1 = threading.Thread(target=thread_task)
t1.start()

t2 = threading.Thread(target=thread_task)
t2.start()

t3 = threading.Thread(target=thread_task)
t3.start()

if __name__ == "__main__":
p1 = multiprocessing.Process(target=task, args=(0, 5000000))
p1.start()

p2 = multiprocessing.Process(target=task, args=(5000000, 10000000))
p2.start()

2 多线程开发

2.1 常见API

IO密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】

import threading

# 创建线程,target是函数名,args是传递的函数参数
t = threading.Thread(target=task, args=('xxx', ))

# 线程准备就绪(等待CPU调度,不一定立刻执行),代码向下继续
t.start()

# 等待当前线程的任务执行完毕后再向下继续执行。
t.join()

# 守护线程(必须放在t.start()前)
t.daemon = True # 设置为守护线程,主线程执行完毕后,子线程也自动关闭。
t.daemon = False # 设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才关闭。(默认)

# 线程名称设置(必须放在t.start()前)
t.name = "thread_one"
# 线程名称获取
name = threading.current_thread().name
# 自定义线程类
class MyThread(threading.Thread):
# 重写run,当线程被start时会执行此处的代码
def run(self):
# self._args是传递进来的参数
print("执行此线程", self._args)

t = MyThread(args=(100, ))
t.start()

2.2 API使用

t.join()案例中,最终print的结果是随机的,因为存在CPU调度算法,所以一个线程不一定会完全执行完毕,可能在执行过程中就切换了。所以在开启t1t2线程后,有可能加了一部分数,t1就切换成了t2,于是join就过去了,t2同理,所以最后的number就不会是0。

# 流程(t.start())
import threading

def task(arg):
pass

# 创建一个Thread对象(线程),并传递参数
t.threading.Thread(target=task, args=('xxx', ))
# 线程准备就绪(等待CPU调度),代码向下继续
t.start()

# 主线程执行完所有代码后不会结束,会等待所有子线程执行完毕
print("继续执行...")
# 等待线程(t.join())
import threading

loop = 1000000
number = 0

def _add(count):
global number
for i in range(loop):
number += 1

def _sub(count):
global number
for i in range(loop):
number -= 1

t1 = threading.Thread(target=_add, args=(loop, ))
t2 = threading.Thread(target=_sub, args=(loop, ))

t1.start()
t2.start()

t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走

print(number)
# 使用线程类下载
import requests
import threading

class DownloadThread(threading.Thread):
def run(self):
file_name, video_url = self._args
res = requests.get(video_url)
with open(file_name, mode="wb") as f:
f.write(res.content)

url_list = {
("1.mp4", "https://www.xxx.com/?id=1"),
("2.mp4", "https://www.xxx.com/?id=2"),
("3.mp4", "https://www.xxx.com/?id=3"),
}

for item in url_list:
t = DownloadThread(args=(item[0], item[1]))
t.start()

2.3 线程安全

因为CPU在执行的过程中,可能会轮流地执行线程。

因此可以给线程加锁,这样就可以保证一个线程被执行完才会执行另一个线程。

不同线程使用的锁必须是同一把锁,不然就没有加锁的意义了。

在开发的过程中要注意有些操作默认都是线程安全的(内部集成了锁的机制),我们在使用的时无需再通过锁再处理,比如L.append(x)等。

官网:official::Python.org,搜:atomic

根据官网,常见线程安全的操作:

  • L.append(x)
  • L1.extend(L2)
  • x = L[i]
  • x = L.pop()
  • L1[i:j] = L2
  • L.sort()
  • x = y
  • x.field = y
  • D[x] = y
  • D1.update(D2)
  • D.keys()

常见线程不安全的操作:

  • i = i+1
  • L.append(L[-1])
  • L[i] = L[j]
  • D[x] = D[x] + 1
# 创造锁对象
lock_object = threading.RLock()

# 加锁(申请一个锁,申请到了就会加锁,没申请到就会等待)
lock_object.acquire()

# 释放锁(将锁释放出去给别的线程使用)
lock_object.release()
# 解决(t.join()中print结果不为0的问题)
import threading

# 创造锁对象
lock_object = threading.RLock()

loop = 1000000
number = 0

def _add(count):
# 加锁(申请一个锁,申请到了就会加锁,没申请到就会等待)
lock_object.acquire()
global number
for i in range(loop):
number += 1
# 释放锁(将锁释放出去给别的线程使用)
lock_object.release()

def _sub(count):
# 加锁和释放锁也可以写成with的形式,自动acquire和release
with lock_object:
global number
for i in range(loop):
number -= 1

t1 = threading.Thread(target=_add, args=(loop, ))
t2 = threading.Thread(target=_sub, args=(loop, ))

t1.start()
t2.start()

t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走

print(number) # 结果一定为0
# 列表是线程安全的(结果是一定的)
import threading

data_list = []

def task():
print("开始")
for i in range(100000):
data_list.append(i)
print(len(data_list))


for i in range(2):
t = threading.Thread(target=task)
t.start()

2.4 线程锁

在python中,锁有LockRLock。它们功能几乎一致,但是在嵌套使用上存在差异。Lock是同步锁,RLock是嵌套锁,也就是说Lock是不支持锁嵌套的,但是RLock可以进行锁的嵌套。

# Lock允许的情况
import threading

num = 0
lock_object = threading.Lock()

def task():
print("开始")
# 可以在一个函数中多次锁和多次解锁,不同的线程会进行争锁。
lock_object.acquire()
print(threading.current_thread().name)
global num
for i in range(100000):
num += 1
lock_object.release()

lock_object.acquire()
print(threading.current_thread().name)
for i in range(100000):
num += 1
lock_object.release()
print(num)


for i in range(2):
t = threading.Thread(target=task)
t.start()

# Lock不允许的情况(只改task函数)
def task():
print("开始")
# 这样会造成死锁,第一个抢到锁的线程会停在第二个acquire()的位置,其他的会停在第一个。
lock_object.acquire()
print(threading.current_thread().name)
lock_object.acquire()
print(threading.current_thread().name)
global num
for i in range(100000):
num += 1
lock_object.release()
lock_object.release()
# RLock 允许的情况
import threading

num = 0
lock_object = threading.RLock()

def task():
print("开始")
# 可以多次锁和解锁
lock_object.acquire()
lock_object.acquire()
print(threading.current_thread().name)
lock_object.release()
lock_object.release()


for i in range(3):
t = threading.Thread(target=task)
t.start()


# 实际开发中会遇到的情况
import threading

lock = threading.RLock()

# 程序员A写的函数,需要保证线程安全,使用了锁
def func():
with lock:
pass

# 程序员B写的函数,会调用程序员A写的func函数,但不加锁
def run():
print("其他功能")
func() # 调用的函数内部存在锁,但此处不会有什么影响
print("其他功能")

# 程序员C写的函数,会调用程序员A写的func函数,但需要额外加锁
def process():
with lock:
print("其他功能")
func() # 此时就会出现多次锁的情况,需要RLock()
print("其他功能")

2.5 死锁

程序因为锁无法向下进行称为死锁。

死锁的情况:

  • 使用Lock两次加锁会导致死锁。因为Lock本身不可以进行嵌套,所以嵌套使用Lock会导致程序卡死。
  • 使用多把锁时,不同线程互相持有对方需要的锁,互相等待对方释放锁,从而造成死锁
# 情况一:使用Lock两次加锁会导致死锁。
import threading

num = 0
lock_object = threading.Lock()

def task():
print("开始")
# 这样会造成死锁,第一个抢到锁的线程会停在第二个acquire()的位置,其他的会停在第一个。
lock_object.acquire()
print(threading.current_thread().name)
lock_object.acquire()
print(threading.current_thread().name)
global num
for i in range(100000):
num += 1
lock_object.release()
lock_object.release()


for i in range(2):
t = threading.Thread(target=task)
t.start()
# 情况二:多个任务拿多个锁导致卡死
import threading
import time

lock_1 = threading.Lock()
lock_2 = threading.Lock()

# task1拿到1后等待2
def task1():
lock_1.acquire()
print(1)
time.sleep(1)
lock_2.acquire()
print(11)
lock_2.release()
print(111)
lock_1.release()
print(1111)

# task2拿到2后等待1,但因为两个锁都没释放,所以产生了死锁
def task2():
lock_2.acquire()
print(2)
time.sleep(1)
lock_1.acquire()
print(22)
lock_1.release()
print(222)
lock_2.release()
print(2222)


t1 = threading.Thread(target=task1)
t1.start()

t2 = threading.Thread(target=task2)
t2.start()

2.6 线程池

Python3中官方才正式提供线程池。

线程不是开的越多越好,开的多了可能会导致系统的性能更低了。因为线程的上下文切换也需要耗费时间和资源。

不建议:无限制的创建线程。

建议:使用线程池。

在循环中向线程池提交任务时,循环会很快的结束,任务会全部加入到线程池中,但是不是所有任务都会被很快地被执行,而是要看线程池的调度。

案例:

  1. 线程池的使用:会立刻输出END,然后再依次执行子线程任务。因为线程池使用时,线程会立刻被创建完成,然后交给线程池进行调度。

  2. 主线程等待线程池工作完毕:加上pool.shutdown(True)会等待子线程执行完再再向下执行。

  3. 执行完任务后,再额外干点别的:加上future.add_done_callback(done),会在子线程做完操作后再做额外操作。在线程池中,回调是由子线程做的。
  4. 最后统一获得结果:将回调结果加入列表中,最后统一来执行回调结果。
# 如下的代码是不推荐在项目开发中编写。
import threading

def task(video_url):
pass

url_list = ["www.xxxx-{}.com".format(i) for i in range(30000)]

for url in url_list:
t = threading.Thread(target=task, args=(url, ))
t.start()

# 这样每个任务都创建一个线程,线程会特别多,导致效率降低。
# 线程池API
import time
from concurrent.futures import ThreadPoolExecutor

# 创建线程池,n表示线程池最多有n个线程
pool = ThreadPoolExecutor(n)

# 向线程池提交一个任务,第一个是函数名,后面的就是参数
future = pool.submit(func, para_1, para_2, ... )

# 等待线程池中的任务执行完毕后,主线程才会继续向下进行
pool.shutdown(True)

# 线程执行完后的额外操作
future.add_done_callback(func_done)
""" 线程池示例1:线程池的使用 """
import time
from concurrent.futures import ThreadPoolExecutor

def task(video_url, num):
print("开始执行任务", video_url)
time.sleep(5)

# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)

url_list = ["www.xxx{}.com".format(i) for i in range(300)]
for url in url_list:
# 在线程池中提交一个任务,线程池中如果有空余线程则会分配一个线程去执行,执行完毕后再将线程交还给线程池。如果没有空闲线程则等待。
pool.submit(task, url, 2)

print("END")


""" 线程池案例2:主线程等待线程池工作完毕 """
import time
from concurrent.futures import ThreadPoolExecutor

def task(video_url, num):
print("开始执行任务", video_url)
time.sleep(0.5)

# 创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)

url_list = ["www.xxx{}.com".format(i) for i in range(300)]
for url in url_list:
# 在线程池中提交一个任务,线程池中如果有空余线程则会分配一个线程去执行,执行完毕后再将线程交还给线程池。如果没有空闲线程则等待。
pool.submit(task, url, 2)

print("Wait...")
pool.shutdown(True)
print("Next...")


""" 线程池案例3:执行完任务后,再额外干点别的"""
import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor


def task(video_url):
print("开始执行任务", video_url)
# 打印当前线程id
print("执行任务的线程id:", threading.current_thread().ident)
time.sleep(2)
return random.randint(0, 10)


# 线程的返回值是response
def done(response):
# 通过response.result()拿到返回值
print("任务执行后的返回值", response.result())
# 打印回调函数的线程id
print("回调函数的线程id:", threading.current_thread().ident)


pool = ThreadPoolExecutor(10)

url_list = ["www.xxx{}.com".format(i) for i in range(100)]
for url in url_list:
future = pool.submit(task, url)
future.add_done_callback(done)


""" 线程池案例4:最后统一获得结果 """
import time
import random
from concurrent.futures import ThreadPoolExecutor


def task(video_url):
print("开始执行任务", video_url)
time.sleep(2)
return random.randint(0, 10)


pool = ThreadPoolExecutor(10)

# 创建列表
future_list = []

url_list = ["www.xxx{}.com".format(i) for i in range(20)]
for url in url_list:
future = pool.submit(task, url)
future_list.append(future)

# 等线程都完成工作,再循环做后续操作
pool.shutdown(True)
for fu in future_list:
print(fu.result())

2.7 单例模式

多线程单例模式下,因为线程之间的交换,所以可能会导致单例模式创造出来的不是一个单例。

为了解决问题则需要加锁。

# 会引发问题的单例模式
import threading
import time


class Singleton:
instance = None

def __init__(self, name):
self.name = name

def __new__(cls, *args, **kwargs):
if cls.instance:
return cls.instance
# 加上time.sleep(0.1)模拟线程停在此处
time.sleep(0.1)
cls.instance = object.__new__(cls)
return cls.instance


def task():
obj = Singleton('x')
print(obj)


for i in range(10):
t = threading.Thread(target=task)
t.start()
# 加锁解决问题
import threading
import time


class Singleton:
instance = None
lock = threading.RLock()

def __init__(self, name):
self.name = name

def __new__(cls, *args, **kwargs):
if cls.instance:
return cls.instance

with cls.lock:
if cls.instance:
return cls.instance
cls.instance = object.__new__(cls)
return cls.instance


def task():
obj = Singleton('x')
print(obj)


for i in range(10):
t = threading.Thread(target=task)
t.start()

3 多进程开发

3.1 进程定义

进程是计算机中资源分配的最小单元。一个进程中可以有多个线程,同一个进程中的线程共享资源。

进程与进程之间则是相互隔离。

Python中通过多进程可以利用CPU的多核优势,

多进程适用于计算密集型,例如∶大量的数据计算【累加计算示例】。

创建进程模式:

  • fork:”拷贝”几乎所有的资源,支持文件对象 / 线程锁传参,用于unix,可以在代码中任意位置开始,操作较快。
  • spawn:相当于在内部线创建一个python解释器,然后让解释器执行代码,不支持文件对象 / 线程锁传参,用于unix、win,必须从main代码块开始,操作较慢。
  • forkserver:在程序开始前会把多进程部分当做模版加载,然后在启动时会找到模板,拷贝一份执行。不支持文件对象 / 线程锁传参,用于部分unix,必须从main代码块开始。

注:Linux系统使用fork创建进程,Windows使用spawn,Mac支持fork和spawn。在python3.8版本后默认使用spawn创建线程,所以程序会要求创建线程代码必须在if __name__ == '__main__':下,否则会异常。

mac中可以修改创建线程使用的方式来避免改变创建方式。

import time
import requests
import multiprocessing
# mac可以修改创建方式来改变创建方式(写在程序开头位置)
# multiprocessing.set_start_method('fork')

url_list = {
("1.mp4", "https://www.xxx.com/?id=1"),
("2.mp4", "https://www.xxx.com/?id=2"),
("3.mp4", "https://www.xxx.com/?id=3"),
}

def task(file_name, video_url):
res = requests.get(video_url)
with open(file_name, mode='wb') as f:
f.write(res.content)
print(time.time())

# windows下必须放在main中(也可以写成一个函数,只要是在__main__下启动即可)
if __name__ == '__main__':
for name, url in url_list:
t = multiprocessing.Process(target=task, args=(name, url))
t.start()
""" fork案例(仅限unix) """
import multiprocessing
import time

# 案例1:子进程改变不影响主进程
def task():
# 拿到主进程的空列表后改变空列表
print(name)
name.append(123)

if __name__ == '__main__':
multiprocessing.set_start_method("fork")
name = []

p1 = multiprocessing.Process(target=task)
p1.start()

time.sleep(2)
# 返回的是空,子进程是拷贝,不会对主进程有影响
print(name)


# 案例2:子进程拿到的是主进程创建子进程前的数据
def task():
# 拿到主进程的[123]列表
print(name)

if __name__ == '__main__':
multiprocessing.set_start_method("fork")
name = []
name.append(123)

p1 = multiprocessing.Process(target=task)
p1.start()


# 案例3:主改变不影响子
def task():
# 拿到主进程的空列表,主进程的name改变了也不影响子进程的name
print(name)

if __name__ == '__main__':
multiprocessing.set_start_method("fork")
name = []

p1 = multiprocessing.Process(target=task)
p1.start()

name.append(123)
""" spawn案例 """
import multiprocessing
import time

# 案例1:子进程不会直接拿到主进程的数据
def task():
# 直接异常,提示没有name
print(name)

if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
name = []

p1 = multiprocessing.Process(target=task)
p1.start()

# 案例2:传递参数就不会报错
def task(data):
# 子进程拿到data后会拷贝一份给子进程用
print(data)
data.append(999)

if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
name = []

p1 = multiprocessing.Process(target=task, args=(name,))
p1.start()

time.sleep(2)
# name输出空,子进程和主进程用的是不同的数据
print(name)
""" fork和spwan传递资源的区别 """
import multiprocessing
import time

# fork:可以直接传递文件和锁
def task(fb):
# 可以直接传递也可以不传递
print(fb, lock)

if __name__ == '__main__':
multiprocessing.set_start_method("fork")

name = []
file_object = open("1.txt", mode="a+", encoding="utf-8")
lock = threading.RLock()

p1 = multiprocessing.Process(target=task, args=(file_object,))
p1.start()


# spwan:不可以传递文件和锁
def task(fb, lk):
# 会直接报错
print(fb, lk)
# 正确方式是在子进程中重新创建一遍
# file_object = open("1.txt", mode="a+", encoding="utf-8")
# lock = threading.RLock()

if __name__ == '__main__':
multiprocessing.set_start_method("fork")

name = []
file_object = open("1.txt", mode="a+", encoding="utf-8")
lock = threading.RLock()

p1 = multiprocessing.Process(target=task, args=(file_object, lock, ))
p1.start()

3.2 进程案例

案例1:

  • 操作:主进程和子进程都进行文件操作。
  • 结果:武沛齐 alex 武沛齐
  • 原因:子进程拷贝主进程,所以一开时子进程有武沛齐,然后又写入了alex,之后flush()一下,此时文件中就有了武沛齐和alex。又因为主进程再等着子进程,主进程的武沛齐一开始没有写入到文件中,当子进程结束后,主进程结束,此时武沛齐才写入到文件中,所以会出现上述结果。前两个单词是子进程写的,后一个单词是主进程写的。

案例2:

  • 操作:主进程在案例1的基础上先flush()一下。
  • 结果:武沛齐 alex
  • 原因:主进程写入后因为直接进行了flush(),所以直接就先把武沛齐写到文件中了,此时传给子进程的缓存内容是空,所以子进程只会再额外写入alex。此时前一个单词是主进程写的,后一个单词是子进程写的。

案例3:

  • 操作:主进程加锁后传递给子进程。
  • 结果:<locked ... >666
  • 原因:主进程加锁后传递给子进程,子进程拿到锁后,一样是锁的状态,只不过在子进程的锁的对象是子进程中的主线程,而主进程的锁的对象是主进程中的主线程。因为此时锁是RLock(),所以可以进行嵌套锁,所以666会被输出在屏幕上。

案例4:

  • 操作:子进程中创建子线程。
  • 结果:先输出十个”来了”,等2秒后依次输出666。
  • 原因:由案例3可知,因为一开始锁就是锁的状态,然后子进程中锁的作用对象是主线程,所以一开始子进程中的子线程全都会卡主,当子进程的主线程的锁释放后,子进程的子线程才会依次开始执行。
# 案例1:文件操作
import multiprocessing

def task():
print(name)
file_object.write("alex\n")
file_object.flush()

if __name__ == "__main__":
multiprocessing.set_start_method("fork")

name = []
file_object = open('x1.txt', mode='a+', encoding='utf-8')
file_object.write("武沛齐\n")

p1 = multiprocessing.Process(target=task)
p1.start()


# 案例2:在案例1的基础上只在主进程中加一次flush()
import multiprocessing

def task():
print(name)
file_object.write("alex\n")
file_object.flush()

if __name__ == "__main__":
multiprocessing.set_start_method("fork")

name = []
file_object = open('x1.txt', mode='a+', encoding='utf-8')
file_object.write("武沛齐\n")
# 新增代码
file_object.flush()

p1 = multiprocessing.Process(target=task)
p1.start()


# 案例3:锁操作
import multiprocessing
import threading

def task():
print(lock)
lock.acquire()
print(666)

if __name__ == "__main__":
multiprocessing.set_start_method("fork")
name = []
lock = threading.RLock()
lock.acquire()

p1 = multiprocessing.Process(target=task)
p1.start()


# 案例4:子进程创建子线程(main同案例3)
def func():
print("来了")
with lock:
print(666)

def task():
for i in range(10):
t = threading.Thread(target=func)
t.start()
time.sleep(2)
lock.release()

3.3 进程API

import multiprocessing

# 设置创建子进程的模式('fork'、'spawn')
multiprocessing.set_start_method("fork")

# 创建子进程(函数名,参数)
p = multiprocessing.Process(target=task, args=("xxx", ))

# 启动子进程
p.start()

# 等待子进程执行完毕再向下执行
p.join()

# 设置守护进程(必须放在start前)
# 守护进程,主进程执行完后子进程也关闭
p.daemon = True
# 非守护进程,主进程等待子进程执行完毕后,主进程才关闭(默认)
p.daemon = False

# 设置名字
p.name = "进程1"

# 获取名字
multiprocessing.current_process().name

# 查看CPU个数(一般cpu有几个就创建几个子进程)
multiprocessing.cpu_count()

""" 补充知识 """
import os
# 获得当前进程id
os.getpid()
# 获得父进程id
os.getppid()


import threading
# 获取进程里所有线程,存入列表中
lst = threading.enumerate()
# 自定义线程类
import multiprocessing

class MyProcess(multiprocessing.Process):
def run(self):
# 获得参数
print("执行次线程", self._args)


if __name__ == "__main__":
multiprocessing.set_start_method("spawn")
p = MyProcess(args=("xxx", ))
p.start()
print("继续执行...")

3.4 数据共享

默认情况下进程之间的资源是独立的,不进行共享。

如果要让他们之间进行共享,需要借助特殊方式实现。

特殊方式:

  • 使用ValueArray。通过这两者创建的数据可以进行共享。但是因为是比较底层,所以使用较少。
  • 使用Manager()。通过Manager()创建的数据类型,可以进行共享。因为可以用python语法,使用较舒服。
  • 使用multiprocessing.Queue()。就是一个队列,只不过这个队列可以实现资源共享,且不会数据混乱。使用较多。
  • 使用multiprocessing.Pipe()。双端队列,可以双向通信,资源共享,且不会数据混乱。使用较多。

上述都是Python内部提供的进程之间数据共享和交换的机制,作为了解即可,在项目开发中很少使用,后期项目中一般会借助第三方的来做资源的共享,例如:MySQL,Redis等。

value参数 代表的类型
c ctypes.c_char
b ctypes.c_byte
h ctypes.c_short
i ctypes.c_int
l ctypes.c_long
f ctypes.c_float
u ctypes.c_wchar
B ctypes.c_ubyte
H ctypes.c_ushort
I ctypes.c_uint
L ctypes.c_ulong
d ctypes.c_double
""" Value和Array方式 """
from multiprocessing import Process, Value, Array

# 案例1:普通数据操作
def func(n, m1, m2):
n.value = 888
m1.value = 'a'.encode('utf-8')
m2.value = '武'

if __name__ == '__main__':
# 设置类型,默认值
num = Value('i', 666)
v1 = Value('c', b' ')
v2 = Value('u', ' ')

p = Process(target=func, args=(num, v1, v2))
p.start()
p.join()

print(num.value) # 888
print(v1.value) # b'a'
print(v2.value) # 武


# 案例2:数组数据操作
def f(data_array):
data_array[0] = 666

if __name__ == '__main__':
# 创建数组,就是C语言的数组(固定类型和长度)
arr = Array('i', [11, 22, 33, 44])

p = Process(target=f, args=(arr,))
p.start()
p.join()

print(arr[:])
""" Manager()方式 """
from multiprocessing import Process, Manager

def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append(666)

if __name__ == "__main__":
with Manager() as manager:
d = manager.dict()
l = manager.list()

p = Process(target=f, args=(d, l))
p.start()
p.join()

print(d)
print(l)

""" Queue()方式 """
import multiprocessing

# 定义子进程任务函数,将数据放入队列
def task(q):
for i in range(10):
q.put(i)

if __name__ == "__main__":
# 创建一个多进程安全的队列
queue = multiprocessing.Queue()

# 创建子进程并传入队列作为参数
p = multiprocessing.Process(target=task, args=(queue,))
p.start()

# 等待子进程完成
p.join()

# 主进程中从队列中获取数据并打印
print("主进程开始获取数据:")
for _ in range(10):
print(queue.get())
""" Pipe()方式 """
import time
import multiprocessing

# 定义子进程任务函数,通过管道与主进程通信
def task(child_conn):
time.sleep(1)
child_conn.send([111, 22, 33, 44]) # 子进程发送数据
data = child_conn.recv() # 阻塞,等待接收主进程发送的数据
print("子进程接收:", data)
time.sleep(2)

if __name__ == "__main__":
# 创建一个管道,返回一对连接对象,一个是父进程端(parent_conn),另一个是子进程端(child_conn)
parent_conn, child_conn = multiprocessing.Pipe()

# 创建子进程并传入子进程端的连接对象
p = multiprocessing.Process(target=task, args=(child_conn,))
p.start()

# 主进程接收子进程发送的数据
info = parent_conn.recv()
print("主进程接收:", info)

# 主进程向子进程发送数据
parent_conn.send(666)

# 为了保证子进程能顺利完成任务,这里应等待子进程结束后再结束主进程
p.join()

3.5 进程锁

进程锁:使用类似于线程锁,但是线程锁不能作为参数传递到子进程中,而进程锁是可以传递到子进程中的。

spawn模式下,在主进程的结尾处需要做一些特殊的处理,不然可能会报错:

  • 使用time.sleep(7)等待7秒。
  • 使用p.join(),主进程等待所有子进程完成后再向下。
import time
import multiprocessing

def task(lock):
print("开始")
lock.acquire()

# 读取文件中的值
with open('f1.txt', mode='r', encoding="utf-8") as f:
current_num = int(f.read())

print("排队抢票了")
time.sleep(0.5)

current_num -= 1

# 更新文件中的值
with open('f1.txt', mode='w', encoding="utf-8") as f:
f.write(str(current_num))

lock.release()

# spawn模式下特殊处理方法1
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
# 进程锁,可以作为参数传递给子进程
lock = multiprocessing.RLock()

for i in range(10):
p = multiprocessing.Process(target=task, args=(lock,))
p.start()

for p in process_list:
p.join()

# 等待7秒
time.sleep(7)


# 方法2
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
# 进程锁,可以作为参数传递给子进程
lock = multiprocessing.RLock()

process_list = []
for i in range(10):
p = multiprocessing.Process(target=task, args=(lock,))
p.start()
process_list.append(p)

# 依次等待任务执行完毕
for item in process_list:
item.join()

3.6 进程池

案例:

  1. 线程池的使用:会立刻输出1,然后再依次执行子进程任务。因为进程池使用时,进程会立刻被创建完成,然后交给进程池进行调度。

  2. 主线程等待线程池工作完毕:加上pool.shutdown(True),会等待子进程执行完再再向下执行。

  3. 执行完任务后,再额外干点别的:加上fur.add_done_callback(done),会在子进程做完操作后再做额外操作,但是此处和线程池有区别。进程池中的回调都是由主进程进行操作。
  4. 在子进程中使用进程锁:在进程池中加锁,需要用到Manager()中的Lock()RLock(),不能使用自带的进程锁。因为在multiprocessing模块中,每个进程都有自己独立的内存空间,因此无法直接共享普通的线程锁,而Manager()中的Lock()RLock()是专门为进程间通信设计的锁,所以应该使用Manager()中的进程锁。
# 进程池API
import time
from concurrent.futures import ProcessPoolExecutor

# 创建进程池,n表示进程池最多有n个进程
pool = ProcessPoolExecutor(n)

# 向进程池提交一个任务,第一个是函数名,后面的就是参数
future = pool.submit(func, para_1, para_2, ... )

# 等待进程池中的任务执行完毕后,主线程才会继续向下进行
pool.shutdown(True)

# 线程执行完后的额外操作
future.add_done_callback(func_done)

# 回调函数传参(利用闭包)
def outer(info, file_name):
def done(res, *args, **kwargs):
info[file_name] = res.result()
return done
fur.add_done_callback(outer(info, file_name))
""" 线程池示例1:线程池的使用 """
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def task(num):
print("执行", num)
time.sleep(2)

if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task, i)

print(1)


""" 线程池案例2:主线程等待线程池工作完毕 """
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def task(num):
print("执行", num)
time.sleep(2)

if __name__ == '__main__':

pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task, i)

pool.shutdown(True)
print(1)

# 或者用with也能实现相同功能
# with ProcessPoolExecutor(4) as pool:
# for i in range(10):
# pool.submit(task, i)
# print(1)


""" 线程池案例3:执行完任务后,再额外干点别的"""
import time
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def task(num):
print("执行", num)
time.sleep(2)
return num

def done(res):
# 发现是主进程在做操作
print(multiprocessing.current_process())
time.sleep(1)
# 通过res.result()拿到返回值
print(res.result())
time.sleep(1)

if __name__ == '__main__':
pool = ProcessPoolExecutor(4)

for i in range(50):
fur = pool.submit(task, i)
fur.add_done_callback(done) # done的调用由主进程处理(与线程池不同)

print(multiprocessing.current_process())
pool.shutdown(True)


""" 线程池案例4:在子进程中使用进程锁 """
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor

def task(lock):
print("开始")

# 不能使用
# lock.acquire()
# lock.release()

with lock:
# 假设文件中保存的内容就是一个值:10
with open('f1.txt', mode='r', encoding='utf-8') as f:
current_num = int(f.read())

print("排队抢票了")
time.sleep(1)

current_num -= 1

# 更新文件中的值
with open('f1.txt', mode='w', encoding='utf-8') as f:
f.write(str(current_num))

if __name__ == '__main__':
pool = ProcessPoolExecutor()
# 不能使用这个进程锁
# lock_object = multiprocessing.RLock()
manager = multiprocessing.Manager()
lock_object = manager.RLock()

for _ in range(10):
pool.submit(task, lock_object)

3.7 多进程案例

要求:统计日志文件中的数据量和ip种数。

数据存在files文件夹下,格式:127.0.0.1 - - [21/Mar/2021] "GET" ......

import os
import time
from concurrent.futures import ProcessPoolExecutor

def task(file_name):
ip_set = set()
total_count = ip_count = 0
file_path = os.path.join("files", file_name)

with open(file_path, mode='r', encoding='utf-8') as file_object:
for line in file_object:
if not line.strip():
continue
user_ip = line.split(" - -", maxsplit=1)[0].split(", ")[0]
total_count += 1
if user_ip in ip_set:
continue
ip_count += 1
ip_set.add(user_ip)
time.sleep(1)

return {"total": total_count, 'ip': ip_count}


def outer(info, file_name):
def done(res, *args, **kwargs):
info[file_name] = res.result()
return done


def run():
# 根据目录读取文件并初始化字典
info = {}
pool = ProcessPoolExecutor(4)

for file_name in os.listdir("files"):
fur = pool.submit(task, file_name)
fur.add_done_callback(outer(info, file_name))

pool.shutdown(True)

for k, v in info.items():
print(k, v)

if __name__ == '__main__':
run()

4 协程

4.1 定义

了解为主。

计算机中提供了:线程、进程用于实现并发编程(真实存在)。

协程(Coroutine),是程序员通过代码搞出来的一个东西(非真实存在)。

协程也可以被称为微线程,是一种用户态内的上下文切换技术。

简而言之,其实就是通过一个线程实现代码块相互切换执行(来回跳着执行)。

下列greenlet和yield可以模拟协程,但是一般不这么使用。

# 利用greenlet模拟协程(pip install greenlet)
from greenlet import greenlet

def func1():
print(1) # 2.输出1
gr2.switch() # 3.切换到func2
print(2) # 6.输出2
gr2.switch() # 7.切换到func2,从上一次执行位置向后执行

def func2():
print(3) # 2.输出3
gr1.switch() # 2.切换到func1,从上一次执行位置向后执行
print(4) # 2.输出4

gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 1.执行func1函数
# yield模拟协程
def func1():
yield 1
yield from func2()
yield 2


def func2():
yield 3
yield 4


f1 = func1()
# 输出结果是1 3 4 2
for item in f1:
print(item)

4.2 asyncio

协程如何才能更有意义呢:不要让用户手动去切换,而是遇到IO操作时能自动切换。

Python在3.4之后推出了asyncio模块+ Python3.5推出asyncasync语法,内部基于协程并且遇到IO请求自动化切换。

# asyncio的API
# async定义一个协程
async def func():

# await用来挂起阻塞方法的执行
await func();

# 阻塞n秒
asyncio.sleep(n)

# 创建一个任务(Task)并立即运行
asyncio.ensure_future(func1())

# 初始化事件循环
loop = asyncio.new_event_loop()

# 设置时间循环
asyncio.set_event_loop(loop)

# 获取事件循环
loop = asyncio.get_event_loop()

# 等待多个协程任务完成
asyncio.wait(tasks)

# 运行事件循环,直到完成所有任务
loop.run_until_complete(asyncio.wait(tasks))
# asyncio案例
import asyncio

async def func1():
print(1)
await asyncio.sleep(2)
print(2)

async def func2():
print(3)
await asyncio.sleep(2)
print(4)

tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

4.3 aiohttp案例

利用协程来爬取图片。

需要先安装aiohttp

实际使用中,一般不在协程中做数据处理,而是只负责获取数据,获取的数据直接放到队列、文件或数据库里。

后续学习:asyncio异步编程,你搞懂了吗? - 知乎 (zhihu.com)

后续学习:asyncio到底是个啥?【python async await】_哔哩哔哩_bilibili

import aiohttp
import asyncio
import urllib

async def download_image(session, url):
print(f"开始下载: {url}")
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
return content


async def save_image(content, filename):
print(f"开始保存: {filename}")
with open(filename, mode='wb') as file_object:
file_object.write(content)


async def fetch_and_save(session, url):
# 下载图片
content = await download_image(session, url)

# 解析 URL 并提取文件名
parsed_url = urllib.parse.urlparse(url)
path_parts = parsed_url.path.split('/')
file_name = path_parts[-1]

# 如果原始 URL 末尾没有文件扩展名,则加上预设的 .jpg 扩展名
if '.' not in file_name:
file_name += ".jpg"

# 保存图片到本地
await save_image(content, file_name)


async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://gitcode.net/qq_44112897/images/-/raw/master/comic/25.jpg',
'https://gitcode.net/qq_44112897/images/-/raw/master/comic/26.jpg',
'https://gitcode.net/qq_44112897/images/-/raw/master/comic/27.jpg'
]

tasks = [asyncio.create_task(fetch_and_save(session, url)) for url in url_list]
await asyncio.wait(tasks)


if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())